zmq 模式 ROUTER和DEALER 的实例

您所在的位置:网站首页 broker dealer 区别 zmq 模式 ROUTER和DEALER 的实例

zmq 模式 ROUTER和DEALER 的实例

2023-10-05 06:36| 来源: 网络整理| 查看: 265

(1)对于Request类型的socket,它是同步的,它一个时刻只能对一个连接进行操作,在一个连接上发送了数据之后,必须接着在这个连接上执行recv,也就是send与recv必须同时匹配出现

(2)Response类型的socket也是同步的,与Request的意思差不多,不过顺序是先recv再send

(3)Router类型的socket是异步的,他可以在随时执行recv与send,而不必在同一时刻必须要强制在某个连接上进行操作,它会根据标志帧来具体的区分应该在哪一个链接上进行操作

(4)Dealer类型的socket,这个更简单的,异步。。。它基本上就没做啥工作。

ROUTER和DEALER模式

DEALER就像是一个异步的REQ,而ROUTER就像一个异步的REP。所以可以相互使用。

ROUTER做代理可以提供可靠的模式来分别识别客户端和后端服务器。

大部分的关键字都是可以相互之间建立新的模式或者组合新的模式。

这些只是基础的模式,根据需求可以设计自己需要的模式。

ROUTER担任代理

graph LR

clientA-->ROUTER1(接收客户端消息) clientB-->ROUTER1 clientC-->ROUTER1 ROUTER2-->DEALER(服务端连接)-->workerA -->ROUTER2 ROUTER2-->DEALER(服务端连接)-->workerB -->ROUTER2 ROUTER2-->DEALER(服务端连接)-->workerC -->ROUTER2 ROUTER1-->clientA (客户端的连接 将服务端处理完接收到的消息发送给客户端) ROUTER1-->clientB ROUTER1-->clientC ROUTER1和ROUTER2在轮询,交互消息 例子中 frontend 和 backend 就是这两个连接 ROUTER1(绑定客户端)作用:接收客户端消息 服务端处理完 将信息返回给客户端 ROUTER2(绑定服务端)作用:绑定服务端的DEALER连接 来做数据交互 DEALER (接收服务端ROUTER2转发的数据 处理之后 返回给ROUTER2)

异步的交互消息 N:M

例子: 转发消息 ROUTER fontend 6001 (与客户端连接,接收客户端消息,将服务端处理完的消息返回给客户端) ROUTER backend 6002(与服务端连接,转发给服务器消息,并接收服务端处理完的消息) worker (处理消息) DEALER 6002 (连接6002 接收转发的消息并处理 返回给6002连接)

中间处理的代码

import zmq context = zmq.Context() frontend = context.socket(zmq.ROUTER) frontend.bind(“tcp://*:6001”) backend = context.socket(zmq.ROUTER) backend.bind("tcp://*:6002") frontend.setsockopt(zmq.RCVHWM, 100) # backend.setsockopt(zmq.SNDHWM, 1) workers_list = OrderedDict() clients_list = {} poller = zmq.Poller() poller.register(backend, zmq.POLLIN) poller.register(frontend, zmq.POLLIN) clients_list = {} #记录客户端的连接信息 while True: socks = dict(poller.poll(1000)) #轮询处理 if (backend in socks and socks[backend] == zmq.POLLIN): worker_addr,client_addr,reply = backend.recv_multipart() #接受服务器处理完的信息 带地址 workers_list[worker_addr]=1 #记录worker的地址 if client_addr in clients_list: #服务端处理完客户端的消息 frontend.send_multipart([client_addr, reply]) #客户端将服务端处理完的消息返回给客户端 else: pass #没有这个客户端 做一些记录 if len(workers_list) > 0: if (frontend in socks and socks[frontend] == zmq.POLLIN): client_addr,request = frontend.recv_multipart() #接收客户端的信息 包含客户端地址 clients_list[client_addr] = 1 #记录客户端的连接地址 worker_id = workers_list.popitem(False)[0] #记录 id backend.send_multipart([worker_id,client_addr,request]) #转发客户端的消息和id地址 让服务端处理

服务端代码

import zmq context = zmq.Context() responser = context.socket(zmq.DEALER) responser.setsockopt(zmq.RCVTIMEO, 10000) responser.connect("tcp://localhost:6002") while True: ident, message = response_socket.recv_multipart() pass _response = "1" #处理完的数据 response_socket.send_multipart([ident, _response])


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3